#include <stdio.h>
#include <string>
#include <signal.h>
#include <atomic>
#include <vector>
#include "uhd_thread.h"
#include "cmdlineparser.h"
#include "tb_interface.h"
#include <uhd.h>
using namespace TASKBUS;
using namespace std;

typedef short SPTYPE;
static std::atomic<bool> stop_signal_called (false);
#define UHD_DO(X) \
{\
	uhd_error e = (X);\
	char buf_errs[512];\
	if (e) { snprintf(buf_errs,sizeof(buf_errs),"Error in line %d, NO %d.",__LINE__,e);\
	std::string ev = __FILE__;\
	ev += ":";\
	ev += buf_errs;\
	return_code = 1;\
	throw ev;\
	}\
	};
void sigint_handler(int code){
	(void)code;
	stop_signal_called = true;
}
void cmd_dealing(uhd_usrp_handle usrp,std::vector<unsigned char> & cmd);
static char error_string[512];
//Speed payback
std::atomic<double> curr_rxfreq[2]{0,0}, curr_txfreq[2]{0,0};
std::atomic<int> curr_rx_payback[2]{0,0}, curr_tx_payback[2]{0,0};
std::atomic<double> freq_speed = 0;
static const double UHD_2PI = 2*3.14159265358979323846;
int do_iio(const cmdlineParser & args)
{
	int return_code = EXIT_SUCCESS;
	std::string dev_args = args.toString("dev_args","");

	const long long time_sync =2;//For multi channel-Sync;
	//RX Channels, Default 0
	std::vector<size_t> rx_channels = args.toUInt64Array("rx_channels");
	adjuestUnit<size_t>(rx_channels,0);
	int rx_channel_count = rx_channels.size();
	if (rx_channel_count>2)
		rx_channel_count = 2;
	//RX Sample Rate in Hz
	double rx_rate		=   args.toDouble("rx_rate",1)*1e6;
	//RX Freq in MHz --> Hz
	std::vector<double> rx_freq      =   args.toDoubleArray("rx_rf");
	adjuestUnit<double> (rx_channel_count,rx_freq,1000,1e6);
	//Rx Gain
	std::vector<double> rx_gain      =   args.toDoubleArray("rx_gain");
	adjuestUnit<double> (rx_channel_count,rx_gain,30);
	//AGC
	std::vector<int>    rx_agc		=   args.toIntArray("rx_agc");
	adjuestUnit<int> (rx_channel_count,rx_agc,0);
	//RX Bw in MHz --> Hz
	std::vector<double> rx_bw		=   args.toDoubleArray("rx_bw");
	adjuestUnit<double> (rx_channel_count,rx_bw ,0 ,1e6);
	//RX Antenna
	std::vector<std::string> rx_atn  =   args.toStringArray("rx_atn");
	adjustString (rx_channel_count,rx_atn,"RX2");
	//RX switch
	bool  rx_on         =   args.toInt("rx_on",0)?true:false;
	//RX Buffer Len
	quint64   rx_frame      =   args.toUInt64("rx_frame",10000);


	//TX Channels, Default 0
	std::vector<size_t> tx_channels = args.toUInt64Array("tx_channels");
	adjuestUnit<size_t>(tx_channels,0);
	int tx_channel_count = tx_channels.size();
	if(tx_channel_count>2)
		tx_channel_count = 2;
	//TX Sample Rate in MHz -->Hz
	double tx_rate		=   args.toDouble("tx_rate",1.0)*1e6;
	//TX Freq in MHz -->Hz
	std::vector<double> tx_freq		=   args.toDoubleArray("tx_rf");
	adjuestUnit<double> (tx_channel_count,tx_freq,2000,1e6);
	//Tx Gain in dB
	std::vector<double> tx_gain      =   args.toDoubleArray("tx_gain");
	adjuestUnit<double> (tx_channel_count,tx_gain,30);
	//TX BW in MHz -->Hz
	std::vector<double> tx_bw		=   args.toDoubleArray("tx_bw");
	adjuestUnit<double> (tx_channel_count,tx_bw,0 ,1e6);
	//TX Antenna
	std::vector<std::string> tx_atn  =   args.toStringArray("tx_atn");
	adjustString (rx_channel_count,tx_atn,"TX/RX");
	//TX switch
	bool  tx_on         =   args.toInt("tx_on",0)?true:false;
	//Tx Buffer Len
	quint64   tx_frame      =   args.toUInt64("tx_frame",10000);

	if (!rx_on && !tx_on)
	{
		fprintf(stderr,"Both RX and TX switch are OFF, nothing can do.\n");
		return 0;
	}
	const quint32 instance	  = args.toUInt("instance",0);
	const quint32 i_wav_rx[2]	  = {args.toUInt("wav_rx0",0),args.toUInt("wav_rx1",0)};
	const quint32 i_wav_tx[2]	  = {args.toUInt("wav_tx0",0),args.toUInt("wav_tx1",0)};
	const quint32 i_txWaterMark = args.toInt("tx_mark",0);
	const quint32 i_orbit_input = args.toUInt("orbit",0);

	//设备句柄
	uhd_usrp_handle usrp = nullptr;
	uhd_rx_streamer_handle rx_streamer = nullptr;
	uhd_rx_metadata_handle rx_meta = nullptr;
	uhd_tx_streamer_handle tx_streamer = nullptr;
	uhd_tx_metadata_handle tx_meta = nullptr;

	//ring buffer in short inters
	//初始化环状缓存器
	const quint64 sz_buffer_all = 128 * 1024 * 1024;
	std::vector<std::vector<SPTYPE> > buf_rx_array;
	std::vector<std::vector<SPTYPE> > buf_tx_array;
	//In IQ Samples, RF io
	quint64 rx_pos = 0, tx_pos = 0,stdout_pos = 0;
	//In IQ Samples, Std IO
	std::vector<quint64> stdin_pos;

	std::vector<void *> rx_buff_ptr, tx_buff_ptr;
	size_t num_rx_samps = 0,num_sps_sent = 0 ;

	for (int ch = 0;ch<rx_channel_count;++ch)
	{
		buf_rx_array.push_back(std::vector<SPTYPE>(sz_buffer_all + 1024*1024,0));
		rx_buff_ptr.push_back(0);
	}
	for (int ch = 0;ch<tx_channel_count;++ch)
	{
		buf_tx_array.push_back(std::vector<SPTYPE>(sz_buffer_all + 1024*1024,0));
		stdin_pos.push_back(0);
		tx_buff_ptr.push_back(0);
	}
	std::mutex push_mtx;
	std::mutex * pMtx = (i_wav_rx[0]+i_wav_rx[1]+i_txWaterMark)>0?&push_mtx:nullptr;

	//要及时启动stdin，以便接收和反馈水位
	//Define StdIn Thread, Producer
	std::function<void()> thread_stdin = [&]()->void
	{
		try{
			double payback_angle = 0;
			//Read STDIN
			while (!stop_signal_called) {
				subject_package_header header;
				std::vector<unsigned char> packagedta = pull_subject(&header);
				if (!is_valid_header(header))
				{
					fprintf(stderr,"Recived BAD Command.");
					fflush(stderr);
					QThread::msleep(100);
					continue;
				}
				if (packagedta.size())
				{
					if ( is_control_subject(header))
					{
						//收到命令进程退出的广播消息,退出
						if (strstr((const char *)packagedta.data(),"function=quit;")!=nullptr)
						{
							fprintf(stderr,"Recived Quit Command.");
							fflush(stderr);
							stop_signal_called = true;
						}
						else
							cmd_dealing(usrp,packagedta);
					}
					else
					{
						for (int ch = 0; ch < tx_channel_count;++ch)
						{
							if (header.subject_id == i_wav_tx[ch])
							{
								SPTYPE * buf_tx_ptr = buf_tx_array[ch].data();
								quint64 sps_rev = packagedta.size()/2/sizeof(SPTYPE);
								short * iq = (short *)(packagedta.data());
								if (curr_tx_payback[ch])
								{
									double pbk = curr_tx_payback[ch];
									SPTYPE (*iq_payback)[2] = (SPTYPE (*)[2])iq;
									const int out_pts = sps_rev;
									for (int spi = 0;spi < out_pts;++spi)
									{
										const double pay_i = cos ( payback_angle ) ;
										const double pay_q = sin ( payback_angle ) ;
										const double res_i = iq_payback[spi][0] * pay_i - iq_payback[spi][1] * pay_q;
										const double res_q = iq_payback[spi][1] * pay_i + iq_payback[spi][0] * pay_q;
										iq_payback[spi][0] = res_i;
										iq_payback[spi][1] = res_q;
										payback_angle += - UHD_2PI * pbk / tx_rate;
										if (payback_angle < -UHD_2PI || payback_angle > UHD_2PI)
											payback_angle = fmod(payback_angle, UHD_2PI);
									}
								}
								memcpy(&buf_tx_ptr[(stdin_pos[ch] * 2)% sz_buffer_all],
										iq,  sps_rev * sizeof(SPTYPE)*2
										);
								const quint64 bgnext = ((stdin_pos[ch] + sps_rev) * 2) % sz_buffer_all;
								if (bgnext < sps_rev * 2 && bgnext>0)
								{
									memcpy(buf_tx_ptr, buf_tx_ptr + sz_buffer_all, bgnext * sizeof(SPTYPE) );
								}
								stdin_pos[ch] += sps_rev;
							}
						}
						if (header.subject_id == i_orbit_input)
						{
							//Freq Payback
							/*!
							 * alt=505840.0124615382;
							 * az=14.092032193806949;
							 * destin=ALL;
							 * el=12.378367847659302;
							 * lat=44.17459623440957;
							 * lon=122.95643447782764;
							 * minel=5;
							 * name=orbit;
							 * ob_alt=0;ob_lat=31.8028926;ob_lon=118.6523438;
							 * range=1560444.815996877;
							 * sid=43770;sname=FOX-1CLIFF (AO-95)      ;
							 * source=orbit;
							 * speed=-6949.003945941497;
							 * utc=1761356490;
							 */
							packagedta.push_back(0);
							QString str = QString::fromUtf8((char *)packagedta.data());
							if (str.contains("name=orbit"))
							{
								int idx = str.indexOf("speed=");
								if (idx>=0)
								{
									int end = str.indexOf(';',idx);
									if (end  <idx + 6)
										end = str.length()-1;
									QString speed_v = str.mid(idx+6,(end-idx-6));
									freq_speed = speed_v.toDouble();
									fprintf(stderr,"FREQ_SPEED=%lf m/s.\n",(double)freq_speed);
								}
								else
									freq_speed = 0;
							}
						}
						if (i_txWaterMark>0)
						{
							qint64 watM = stdin_pos[0] - tx_pos;
							if (tx_channel_count>1)
							{
								qint64 wat2 = stdin_pos[1] - tx_pos;
								if (wat2 < watM)
									watM = wat2;
							}
							TASKBUS::push_subject(
										i_txWaterMark,
										instance,
										sizeof(qint64),
										(unsigned char *) &watM,
										pMtx);

						}
					}
				}
			}
		}
		catch (std::string er)
		{
			fputs(er.c_str(),stderr);
			stop_signal_called = true;
		}
	};

	uhd_io_thread th_wav_stdin(thread_stdin,0);
	th_wav_stdin.start(QThread::HighestPriority);


	try{
		fprintf(stderr, "Creating USRP with args \"%s\"...\n", dev_args.c_str());
		UHD_DO(uhd_usrp_make(&usrp, dev_args.c_str()));

		fflush(stderr);
		// Ctrl+C will exit loop
		signal(SIGINT, &sigint_handler);
		if (rx_on)
		{
			// Create RX streamer
			UHD_DO(uhd_rx_streamer_make(&rx_streamer));
			// Create RX metadata
			UHD_DO(uhd_rx_metadata_make(&rx_meta));
			//RXPer-Channel Settings
			for (int ch = 0; ch < rx_channel_count;++ch)
			{
				fprintf(stderr, "Setting RX Channel: %d=%u...\n",ch, rx_channels[ch]);
				UHD_DO(uhd_usrp_set_rx_antenna(usrp,rx_atn[ch].c_str(),ch));
				fprintf(stderr, "Setting RX Rate: %lf...\n", rx_rate);
				UHD_DO(uhd_usrp_set_rx_rate(usrp, rx_rate, rx_channels[ch]));
				// See what rate actually is
				UHD_DO(uhd_usrp_get_rx_rate(usrp, rx_channels[ch], &rx_rate));
				fprintf(stderr, "Actual RX Rate: %lf...\n", rx_rate);
				// Set gain
				fprintf(stderr, "Setting RX Gain: %lf dB...\n", rx_gain[ch]);
				UHD_DO(uhd_usrp_set_rx_gain(usrp, rx_gain[ch], rx_channels[ch], ""));
				// See what gain actually is
				UHD_DO(uhd_usrp_get_rx_gain(usrp, rx_channels[ch], "", &rx_gain[ch]));
				fprintf(stderr, "Actual RX Gain: %lf...\n", rx_gain[ch]);
				if (rx_agc[ch])
				{
					uhd_usrp_set_rx_agc(usrp,true,rx_channels[ch]);
					uhd_usrp_set_rx_dc_offset_enabled(usrp,true,rx_channels[ch]);
				}
				else
					annouce_handle(
								args.toString("instance","0"),
								"0",
								QString("rxgain_%1").arg(ch).toStdString(),
								QString("%1").arg(rx_gain[ch]).toStdString(),
								QString("rxgain_%1").arg(ch).toStdString(),
								"int",
								"0:100"
								   );
				// Set frequency
				curr_rxfreq[ch] = rx_freq[ch];
				curr_rx_payback[ch] = curr_rxfreq[ch] / 3e8 * freq_speed;
				// Create other necessary structs
				uhd_tune_request_t rx_tune_request =
				{
					/*.target_freq = */rx_freq[ch],
					/*.rf_freq_policy = */UHD_TUNE_REQUEST_POLICY_AUTO,
					/*.rf_freq = */0,
					/*.dsp_freq_policy = */UHD_TUNE_REQUEST_POLICY_AUTO,
					/*.dsp_freq = */0,
					/*.args = */0
				};
				uhd_tune_result_t rx_tune_result;

				fprintf(stderr, "Setting RX frequency: %lf MHz...\n", rx_freq[ch]/1e6);
				UHD_DO(uhd_usrp_set_rx_freq(usrp, &rx_tune_request, rx_channels[ch], &rx_tune_result));
				// See what frequency actually isrx_streamer
				UHD_DO(uhd_usrp_get_rx_freq(usrp, rx_channels[ch], &rx_freq[ch]));
				fprintf(stderr, "Actual RX frequency: %lf MHz...\n", rx_freq[ch] / 1e6);
				annouce_handle(
							args.toString("instance","0"),
							"0",
							QString("rxfreq_%1").arg(ch).toStdString(),
							QString("%1").arg(rx_freq[ch]).toStdString(),
							QString("rxfreq_%1").arg(ch).toStdString(),
							"double",
							"0:6000000000","100000","0"
							   );

				fprintf(stderr, "Setting RX Bandwidth: %lf MHz...\n", rx_bw[ch]/1e6);
				UHD_DO(uhd_usrp_set_rx_bandwidth(usrp, rx_bw[ch], rx_channels[ch]));
				//Band
				UHD_DO(uhd_usrp_get_rx_bandwidth(usrp, rx_channels[ch], &rx_bw[ch]));
				fprintf(stderr, "Actual RX Bandwidth: %lf MHz...\n", rx_bw[ch] / 1e6);
				fflush(stderr);


			}
		}
		if (tx_on)
		{
			// Create TX streamer
			UHD_DO(uhd_tx_streamer_make(&tx_streamer));
			// Create TX metadata
			UHD_DO(uhd_tx_metadata_make(&tx_meta, false, 0,0, false, false));

			//TX Per-Channel Settings
			for (int ch = 0;ch < tx_channel_count;++ch)
			{
				fprintf(stderr, "Setting TX Channel: %d=%lu...\n",ch, tx_channels[ch]);
				UHD_DO(uhd_usrp_set_tx_antenna(usrp,tx_atn[ch].c_str(),ch));
				// Set rate
				fprintf(stderr, "Setting TX Rate: %lf...\n", tx_rate);
				UHD_DO(uhd_usrp_set_tx_rate(usrp, tx_rate, tx_channels[ch]));
				// See what rate actually is
				UHD_DO(uhd_usrp_get_tx_rate(usrp, tx_channels[ch], &tx_rate));
				fprintf(stderr, "Actual TX Rate: %lf...\n\n", tx_rate);
				// Set gain
				fprintf(stderr, "Setting TX Gain: %lf db...\n", tx_gain[ch]);
				UHD_DO(uhd_usrp_set_tx_gain(usrp, tx_gain[ch],tx_channels[ch], ""));
				// See what gain actually is
				UHD_DO(uhd_usrp_get_tx_gain(usrp, tx_channels[ch], "", &tx_gain[ch]));
				fprintf(stderr, "Actual TX Gain: %lf...\n", tx_gain[ch]);
				annouce_handle(
							args.toString("instance","0"),
							"0",
							QString("txgain_%1").arg(ch).toStdString(),
							QString("%1").arg(tx_gain[ch]).toStdString(),
							QString("txgain_%1").arg(ch).toStdString(),
							"int",
							"0:100"
							   );
				curr_txfreq[ch] = tx_freq[ch];
				curr_tx_payback[ch] = curr_txfreq[ch] / 3e8 * freq_speed;
				// Create other necessary structs for TX
				uhd_tune_request_t tx_tune_request = {
					/*.target_freq = */tx_freq[ch],
					/*.rf_freq_policy = */UHD_TUNE_REQUEST_POLICY_AUTO,
					/*.rf_freq = */0,
					/*.dsp_freq_policy = */UHD_TUNE_REQUEST_POLICY_AUTO,
					/*.dsp_freq = */0,
					/*.args = */0
				};
				uhd_tune_result_t tx_tune_result;
				// Set frequency
				fprintf(stderr, "Setting TX frequency: %f MHz...\n", tx_freq[ch] / 1e6);
				UHD_DO(uhd_usrp_set_tx_freq(usrp, &tx_tune_request, tx_channels[ch], &tx_tune_result));

				// See what frequency actually is
				UHD_DO(uhd_usrp_get_tx_freq(usrp, tx_channels[ch], &tx_freq[ch]));
				fprintf(stderr, "Actual TX frequency: %f MHz...\n", tx_freq[ch] / 1e6);
				annouce_handle(
							args.toString("instance","0"),
							"0",
							QString("txfreq_%1").arg(ch).toStdString(),
							QString("%1").arg(tx_freq[ch]).toStdString(),
							QString("txfreq_%1").arg(ch).toStdString(),
							"double",
							"0:6000000000","100000","0"
							   );

				//Band
				fprintf(stderr, "Setting TX Bandwidth: %f MHz...\n", tx_bw[ch]/1e6);
				UHD_DO(uhd_usrp_set_tx_bandwidth(usrp, tx_bw[ch],tx_channels[ch]));

				UHD_DO(uhd_usrp_get_tx_bandwidth(usrp, tx_channels[ch], &tx_bw[ch]));
				fprintf(stderr, "Actual TX Bandwidth: %f MHz...\n", tx_bw[ch] / 1e6);


			}
		}
		char rx_cpu_format[] = "sc16";
		char rx_otw_format[] = "sc16";
		char rx_args[] = "";

		uhd_stream_args_t rx_stream_args = {
			/* .cpu_format = */rx_cpu_format,
			/* .otw_format = */rx_otw_format,
			/* .args = */rx_args,
			/* .channel_list = */rx_channels.data(),
			/*.n_channels = */rx_channel_count
		};
		uhd_stream_cmd_t rx_stream_cmd = {
			/*.stream_mode = */UHD_STREAM_MODE_START_CONTINUOUS,
			/*.num_samps = */0,
			/*.stream_now = */false,
			/*.time_spec_full_secs = */time_sync,
			/*.time_spec_frac_secs = */0
		};

		//char tx_cpu_format[] = "fc32";
		char tx_cpu_format[] = "sc16";
		char tx_otw_format[] = "sc16";
		char tx_args[] = "";

		uhd_stream_args_t tx_stream_args = {
			/*.cpu_format = */tx_cpu_format,
			/*.otw_format = */tx_otw_format,
			/*.args = */tx_args,
			/*.channel_list = */tx_channels.data(),
			/* .n_channels = */tx_channel_count
		};
		size_t rx_sps_buff = 0;
		size_t tx_sps_buff = 0;
		if (rx_on)
		{
			// Set up streamer
			UHD_DO(uhd_usrp_get_rx_stream(usrp, &rx_stream_args, rx_streamer));
			// Set up buffer
			UHD_DO(uhd_rx_streamer_max_num_samps(rx_streamer, &rx_sps_buff));
			fprintf(stderr, "RX Buffer size in samples: %zu\n", rx_sps_buff);
		}
		if (tx_on)
		{
			UHD_DO(uhd_usrp_get_tx_stream(usrp, &tx_stream_args, tx_streamer));
			// Set up buffer
			UHD_DO(uhd_tx_streamer_max_num_samps(tx_streamer, &tx_sps_buff));
			fprintf(stderr, "TX Buffer size in samples: %zu\n", tx_sps_buff);
			fflush(stderr);
		}

		//reset time
		uhd_usrp_set_time_now(usrp,0,0,0);
		//Define RX Thread, Producer
		std::function<void()> thread_rx = [&]()->void
		{
			try{
				// Issue stream command
				fprintf(stderr, "Issuing stream command.\n");
				UHD_DO(uhd_rx_streamer_issue_stream_cmd(rx_streamer, &rx_stream_cmd));

				//Read, RX
				while (!stop_signal_called) {
					//Init Ptr
					for (int ch=0;ch<rx_channel_count;++ch)
						rx_buff_ptr[ch] = (void *) &(buf_rx_array[ch][(rx_pos * 2) % sz_buffer_all]);
					// Handle data
					UHD_DO(uhd_rx_streamer_recv(
							   rx_streamer,
							   rx_buff_ptr.data(),
							   rx_sps_buff,
							   &rx_meta, 5, true, &num_rx_samps));
					//拼接首尾
					const quint64 bgnext = ((rx_pos + num_rx_samps) * 2) % sz_buffer_all;
					if (bgnext < num_rx_samps * 2 && bgnext>0)
						for (int ch=0;ch<rx_channel_count;++ch)
							memcpy(&buf_rx_array[ch][0], (char *)(&buf_rx_array[ch][0]) + sz_buffer_all, bgnext * sizeof(SPTYPE) );
					rx_pos += num_rx_samps;
					uhd_rx_metadata_error_code_t error_code;
					UHD_DO(uhd_rx_metadata_error_code(rx_meta, &error_code));
					if(error_code != UHD_RX_METADATA_ERROR_CODE_NONE){
						fprintf(stderr, "Warning: Error code 0x%x 16was returned during streaming.\n", error_code);
						fputs(error_string,stderr);
						//UHD_DO(uhd_rx_streamer_issue_stream_cmd(rx_streamer, &rx_stream_cmd));
						//stop_signal_called = true;
					}
				}
			}
			catch (std::string er)
			{
				fputs(er.c_str(),stderr);
				stop_signal_called = true;
			}
		};


		//Define StdOut Thread, Consumer
		std::function<void()> thread_stdout = [&]()->void
		{
			try{
				double payback_angle = 0;
				//Read, RX
				while (!stop_signal_called)
				{
					if (stdout_pos + (long long)(rx_frame + rx_sps_buff )  >= rx_pos )
					{
						QThread::msleep(1);
						continue;
					}
					const quint64 bgnext = ((stdout_pos + rx_frame) * 2) % sz_buffer_all;
					for (int ch = 0;ch<rx_channel_count;++ch)
					{
						SPTYPE * buf_rx_ptr = buf_rx_array[ch].data();
						//Dealing with the tail
						if (bgnext < rx_frame * 2 && bgnext>0)
							memcpy( buf_rx_ptr + sz_buffer_all,buf_rx_ptr, bgnext * sizeof(SPTYPE) );
						if (i_wav_rx[ch]>0)
						{
							//freq payback
							if (curr_rx_payback[ch])
							{
								double pbk = curr_rx_payback[ch];
								SPTYPE (*iq_out)[2] = (SPTYPE (*)[2])(& buf_rx_ptr[(stdout_pos * 2)% sz_buffer_all]);
								const int out_pts = rx_frame;
								for (int spi = 0;spi < out_pts;++spi)
								{
									const double pay_i = cos ( payback_angle ) ;
									const double pay_q = sin ( payback_angle ) ;
									const double res_i = iq_out[spi][0] * pay_i - iq_out[spi][1] * pay_q;
									const double res_q = iq_out[spi][1] * pay_i + iq_out[spi][0] * pay_q;
									iq_out[spi][0] = res_i;
									iq_out[spi][1] = res_q;
									payback_angle += - UHD_2PI * pbk / rx_rate;
									if (payback_angle < -UHD_2PI || payback_angle > UHD_2PI)
										payback_angle = fmod(payback_angle, UHD_2PI);
								}
							}
							TASKBUS::push_subject(
										i_wav_rx[ch],
										instance,
										rx_frame * 2 * sizeof(SPTYPE),
										(unsigned char *) &buf_rx_ptr[(stdout_pos * 2)% sz_buffer_all],
										pMtx);
						}
					}
					stdout_pos += rx_frame;
				}
			}
			catch (std::string er)
			{
				fputs(er.c_str(),stderr);
				stop_signal_called = true;
			}
		};


		//Define thread_tx, Consumer
		std::function<void()> thread_tx = [&]()->void
		{
			try{
				while (!stop_signal_called)
				{
					if (i_txWaterMark>0)
					{
						qint64 watM = stdin_pos[0] - tx_pos;
						if (tx_channel_count>1)
						{
							qint64 wat2 = stdin_pos[1] - tx_pos;
							if (wat2 < watM)
								watM = wat2;
						}
						TASKBUS::push_subject(
									i_txWaterMark,
									instance,
									sizeof(qint64),
									(unsigned char *) &watM,
									pMtx);

					}
					bool can_send = true;
					for (int ch=0;ch<tx_channel_count && can_send;++ch)
					{
						SPTYPE * buf_tx_ptr = buf_tx_array[ch].data();
						if (stdin_pos[ch] < (tx_sps_buff + tx_frame)*10)
						{
							can_send = false;
							continue;
						}
						if (tx_pos + (long long) (tx_sps_buff + tx_frame) >= stdin_pos[ch])
						{
							can_send = false;
							continue;
						}
						const quint64 bgnext = ((tx_pos + tx_sps_buff) * 2) % sz_buffer_all;
						//Dealing with the tail
						if (bgnext < tx_sps_buff * 2 && bgnext>0)
						{
							memcpy( buf_tx_ptr + sz_buffer_all, buf_tx_ptr, bgnext * sizeof(SPTYPE) );
						}
						SPTYPE * tx_buff = &buf_tx_ptr[(tx_pos * 2) % sz_buffer_all];
						tx_buff_ptr[ch] = (void *)tx_buff;
					}
					if (!can_send)
					{
						QThread::msleep(1);
					}
					else
					{
						UHD_DO(uhd_tx_streamer_send(tx_streamer,
													(const void **)tx_buff_ptr.data(),
													tx_sps_buff, &tx_meta,0.1, &num_sps_sent));
						tx_pos += num_sps_sent;
					}
				}
			}
			catch(std::string er)
			{
				fputs(er.c_str(),stderr);
				stop_signal_called = true;
			}
		};

		uhd_io_thread th_wav_rx(thread_rx,0);
		uhd_io_thread th_wav_stdout(thread_stdout,0);
		uhd_io_thread th_wav_tx(thread_tx,0);

		if (rx_on)
		{
			th_wav_rx.start(QThread::HighestPriority);
			th_wav_stdout.start(QThread::HighestPriority);
		}
		if (tx_on)
		{
			th_wav_tx.start(QThread::HighestPriority);
		}

		while (!stop_signal_called)
		{
			fprintf(stderr,"\nRX:");
			fprintf (stderr,"%lld,%lld;",rx_pos,stdout_pos);
			fprintf(stderr,"\n");
			fprintf(stderr,"TX:");
			for (int ch = 0;ch<tx_channel_count;++ch)
				fprintf (stderr,"%d:%lld,%lld;",ch,stdin_pos[ch],tx_pos);
			fprintf(stderr,"\n");
			fflush(stderr);
			QThread::msleep(1000);
		}

		th_wav_rx.wait();
		th_wav_stdout.wait();
		th_wav_tx.wait();
	}
	catch(std::string s)
	{
		fputs(s.c_str(),stderr);
	}

	fflush(stderr);

	if (tx_streamer) uhd_tx_streamer_free(&tx_streamer);
	if (rx_streamer) uhd_rx_streamer_free(&rx_streamer);
	if (tx_meta) uhd_tx_metadata_free(&tx_meta);
	if (rx_meta) uhd_rx_metadata_free(&rx_meta);

	if(return_code != EXIT_SUCCESS && usrp != nullptr){
		uhd_usrp_last_error(usrp, error_string, 512);
		fprintf(stderr, "USRP reported the following error: %s\n", error_string);
	}
	uhd_usrp_free(&usrp);

	th_wav_stdin.wait();

	fprintf(stderr, (return_code ? "Failure\n" : "Success\n"));
	fflush(stderr);
	return return_code;
}

void cmd_dealing(uhd_usrp_handle usrp,std::vector<unsigned char> & cmd)
{
	if (!usrp)
		return;
	std::map<std::string,std::string> mcmd = TASKBUS::ctrlpackage_to_map(cmd);
	if (mcmd["function"]=="handle_set")
	{
		if (mcmd["handle"].find("rxgain_")!=mcmd["handle"].npos)
		{
			int channel = atoi(mcmd["handle"].substr(7).c_str());
			int gain = atoi(mcmd["value"].c_str());
			uhd_usrp_set_rx_gain(usrp,gain,channel,"");
			double dgain = 0;
			uhd_usrp_get_rx_gain(usrp, channel, "", &dgain);
			fprintf(stderr, "Actual RX Gain: %lf...\n", dgain);

		}
		else if (mcmd["handle"].find("txgain_")!=mcmd["handle"].npos)
		{
			int channel = atoi(mcmd["handle"].substr(7).c_str());
			int gain = atoi(mcmd["value"].c_str());
			uhd_usrp_set_tx_gain(usrp,gain,channel,"");
			double dgain = 0;
			uhd_usrp_get_tx_gain(usrp, channel, "", &dgain);
			fprintf(stderr, "Actual TX Gain: %lf...\n", dgain);

		}
		else if (mcmd["handle"].find("rxfreq_")!=mcmd["handle"].npos)
		{
			int channel = atoi(mcmd["handle"].substr(7).c_str());
			double freq = atof(mcmd["value"].c_str());
			uhd_tune_request_t rx_tune_request =
			{
				/*.target_freq = */freq,
				/*.rf_freq_policy = */UHD_TUNE_REQUEST_POLICY_AUTO,
				/*.rf_freq = */0,
				/*.dsp_freq_policy = */UHD_TUNE_REQUEST_POLICY_AUTO,
				/*.dsp_freq = */0,
				/*.args = */0
			};
			uhd_tune_result_t rx_tune_result;
			fprintf(stderr, "Setting RX frequency: %lf MHz...\n", freq/1e6);
			uhd_usrp_set_rx_freq(usrp, &rx_tune_request, channel, &rx_tune_result);
			curr_rxfreq[channel] = freq;
			curr_rx_payback[channel] = freq / 3e8 * freq_speed;
		}
		else if (mcmd["handle"].find("txfreq_")!=mcmd["handle"].npos)
		{
			int channel = atoi(mcmd["handle"].substr(7).c_str());
			double freq = atof(mcmd["value"].c_str());
			uhd_tune_request_t tx_tune_request =
			{
				/*.target_freq = */freq,
				/*.rf_freq_policy = */UHD_TUNE_REQUEST_POLICY_AUTO,
				/*.rf_freq = */0,
				/*.dsp_freq_policy = */UHD_TUNE_REQUEST_POLICY_AUTO,
				/*.dsp_freq = */0,
				/*.args = */0
			};
			uhd_tune_result_t rx_tune_result;
			fprintf(stderr, "Setting RX frequency: %lf MHz...\n", freq/1e6);
			uhd_usrp_set_tx_freq(usrp, &tx_tune_request, channel, &rx_tune_result);
			curr_txfreq[channel] = freq;
			curr_tx_payback[channel] = freq / 3e8 * freq_speed;
		}
	}
}
